【AWS CDK】Step Functions内のLambdaをGlue (Python Shell)に置き換えてみた
データアナリティクス事業本部のueharaです。
今回は、Step Functions内のLambdaをGlue (Python Shell)にAWS CDKを利用して置き換えてみたいと思います。
はじめに
Lambdaには最大で15分という実行時間の制限があります。
したがって、最初はLambdaで行っていた処理も処理が肥大化してくると「実行時間制限に収まらない!」といったケースが出てきます。
今回はStep Functions内で利用されているLambdaをGlueに置き換える処理をAWS CDKを用いてやってみたいと思います。
イメージ図
以下で説明する全体のコードはGitHubにアップロードしています。
移行に際しての観点
Step Functions内のLambdaについて、Glue (Python Shell)への置き換えでは以下の事項を考慮する必要があります。
- Lambda Layerでパッケージしていたものはどうするか?
- Lambdaのソースとしてアップロードしていた共通のpyモジュールのディレクトリはどうするか?
lambda_handler
でevent
として受け取っていたステートマシンの入力をどうするか?- ステートマシンへの出力として
lambda_handler
の返り値として渡していたものをどうするか? - Lambdaの環境変数と設定していたものはどうするか?
以上を踏まえ、元のLambdaのスクリプトの修正を最小限にしつつ、Glue (Python Shell)へ移行してみたいと思います。
移行前の構成を作成(Lambda)
まず、AWS CDKで移行前の構成を作成します。
プロジェクトディレクトリは以下の通りです。
※ node_modules
, cdk.out
ディレクトリについては省略しています。
.
├── bin
│ └── cdk-my-sample-app.ts
├── lib
│ ├── cdk-my-sample-app-stack.ts
│ ├── iam_role.ts
│ ├── lambda.ts
│ └── stepfunctions_with_lambda.ts
├── resources
│ ├── lambda
│ │ ├── common
│ │ │ ├── __init__.py
│ │ │ └── my_utils.py
│ │ ├── end_handler.py
│ │ └── start_handler.py
│ └── lambda_layer
│ └── requirements.txt
├── test
│ └── cdk-sample-app.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
└── tsconfig.json
以下、主要なファイルのみ説明を行います。
主要ファイルの説明
lib/lambda.ts
lambda.ts
は以下です。
import { Duration } from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
type dict = { [key: string]: any; }
export function createLambdaFunction(scope: Construct, id: string, role: iam.Role, layers: lambda.ILayerVersion[], environment: dict = {}): lambda.Function {
const lambdaSourceName = id.replace(/-/g, '_')
return new lambda.Function(scope, id, {
functionName: `${id}-test`,
runtime: lambda.Runtime.PYTHON_3_9,
handler: `${lambdaSourceName}.lambda_handler`,
code: lambda.Code.fromAsset('resources/lambda'),
memorySize: 128,
timeout: Duration.seconds(900),
role: role,
layers: layers,
environment: environment,
});
}
使用するhandlerやLambda Layer、環境変数の値を受け取り、 aws_lambda.Function
クラスを返す関数を定義しています。
Lambda関数にデプロイする資材は resources/lambda
配下のものを指定しています。したがって、 my_utils.py
を含む common
ディレクトリと、各種handlerの .py
ファイルがアップロードされます。
lib/stepfunctions_with_lambda.ts
stepfunctions_with_lambda.ts
は以下です。
import * as cdk from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Construct } from 'constructs';
export function createStepFunctions(
scope: Construct,
id: string,
stateMachineRole: iam.Role,
startLambda: cdk.aws_lambda.Function,
endLambda: cdk.aws_lambda.Function,
): sfn.StateMachine {
const retry: sfn.RetryProps = {
errors: ['States.ALL'],
maxAttempts: 2,
}
// Lambda タスク
const startTask = new tasks.LambdaInvoke(scope, `${id}_start_task`, {
lambdaFunction: startLambda,
outputPath: '$.Payload',
}).addRetry(retry)
const endTask = new tasks.LambdaInvoke(scope, `${id}_end_task`, {
lambdaFunction: endLambda,
outputPath: '$.Payload',
}).addRetry(retry);
// ステートマシンの定義
const definition = startTask.next(endTask);
return new sfn.StateMachine(scope, id, {
stateMachineName: `${id}`,
definitionBody: sfn.DefinitionBody.fromChainable(definition),
timeout: cdk.Duration.hours(1),
role: stateMachineRole,
});
}
2つの aws_lambda.Function
クラスを受け取り、連結しています。(冒頭で示したイメージ図の通りです)
lib/cdk-my-sample-app-stack.ts
cdk-my-sample-app-stack.ts
は以下です。
import * as lambdaPython from '@aws-cdk/aws-lambda-python-alpha';
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import { createLambdaRole, createStepFunctionsRole } from './iam_role';
import { createLambdaFunction } from './lambda';
import { createStepFunctions } from './stepfunctions_with_lambda';
export class CdkMySampleAppStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// LambdaLayerの作成
const baseLayer = new lambdaPython.PythonLayerVersion(this, 'BaseLayer', {
entry: 'resources/lambda_layer',
compatibleRuntimes: [lambda.Runtime.PYTHON_3_9],
description: 'The layer containing Python dependencies',
});
const layers = [baseLayer]
// Lambda IAM Role
const lambdaRole = createLambdaRole(this, 'my-lambda-role');
// 環境変数の定義
const env_vars = {
SAVE_BUCKET_NAME: 'uehara-test-bucket',
}
// Lambda Function
const startLambda = createLambdaFunction(this, 'start-handler', lambdaRole, layers, env_vars);
const endLambda = createLambdaFunction(this, 'end-handler', lambdaRole, layers, {})
// Step Functions IAM Role
const stepFunctionsRole = createStepFunctionsRole(this, 'my-sfn-role');
// Step Functions with Lambda
const stepFunctions = createStepFunctions(this, 'my-sfn-with-lambda', stepFunctionsRole, startLambda, endLambda);
}
}
基本的にはコメントに記載している通りに定義をしています。
Lambda Layerについては resources/lambda_layer
配下にある requirements.txt
を参照して作成しています。
resources/lambda 配下のファイル
resources/lambda
配下のファイルを以下にまとめて記載します。
pytz==2024.1
requirements.txt
は pytz
モジュールのみ記載しています。
def get_hello_str(name):
return f"Hello! {name}"
サンプルとして、 my_utils.py
には名前を引数として受け取ると Hello! <名前>
という文字列を返す関数を記載します。
import json
import os
from common import my_utils
SAVE_BUCKET_NAME = os.getenv("SAVE_BUCKET_NAME", "")
def lambda_handler(event, context):
print(f"event: {json.dumps(event)}")
name = event.get("name", "")
hello_str = my_utils.get_hello_str(name)
print(f"{hello_str}")
s3_uri = f"s3://{SAVE_BUCKET_NAME}/test"
return {
"result": "success",
"s3_uri": s3_uri,
}
この start_handler.py
が今回Glueへ移行する対象のスクリプトになります。
大したことはしていませんが、
- Step Functionsの入力パラメータの利用
- 共通モジュールの利用
- 環境変数の利用
- Step Functionsの出力パラメータの利用
を行うように構成しています。
後続の end_handler.py
は前段の出力を表示するだけで、特に処理はしていません。
import json
def lambda_handler(event, context):
print(f"event: {json.dumps(event)}")
return {
"result": "success"
}
デプロイしてみる
必要モジュールをインストールし、デプロイを行います。
# 必要モジュールのインストール
$ npm i -D @aws-cdk/[email protected]
# デプロイ
$ cdk deploy CdkMySampleAppStack --profile <AWSプロファイル名>
Step Functionsの実行
デプロイが完了すると以下のようなStep Functionsが作成されます。
作成されたStep Functionsに次のようなパラメータを渡し実行してみます。
{
"name": "uehara"
}
最初のステートの出力は以下の通りです。
Lambda関数の返り値として設定した、 result
と s3_uri
がステートの出力パラメータになっていることが分かります。
{
"result": "success",
"s3_uri": "s3://uehara-test-bucket/test"
}
ログにも、きちんと期待された値が出力されていることが確認できました。
それでは、上記をGlueに置き換えたいと思います。
移行後の構成を作成(Glue)
早速ですが、プロジェクトディレクトリは以下の通りです。
※ node_modules
, cdk.out
ディレクトリについては省略しています。
$ tree -I "node_modules|cdk.out" --dirsfirst
.
├── bin
│ └── cdk-my-sample-app.ts
├── lib
│ ├── cdk-my-sample-app-stack.ts
│ ├── glue.ts ★
│ ├── glue_scripts.ts ★
│ ├── iam_role.ts
│ ├── lambda.ts
│ ├── stepfunctions_with_glue.ts ★
│ └── stepfunctions_with_lambda.ts
├── resources
│ ├── glue ★
│ │ ├── common
│ │ │ ├── __init__.py
│ │ │ └── my_utils.py
│ │ ├── job_scripts
│ │ │ └── start_glue.py
│ │ └── pyproject.toml
│ ├── lambda
│ │ ├── common
│ │ │ ├── __init__.py
│ │ │ └── my_utils.py
│ │ ├── end_handler.py
│ │ └── start_handler.py
│ └── lambda_layer
│ └── requirements.txt
├── test
│ └── cdk-sample-app.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
└── tsconfig.json
「★」印がついているファイル/ディレクトリが今回新規に作成したものになります。
以下、主要なファイルのみ説明を行います。
主要ファイルの説明
lib/glue_scripts.ts
glue_scripts.ts
はGlueのスクリプトをデプロイするための記述になります。
Lambdaでは先に示した通り common
ディレクトリやhandlerファイルがあるディレクトリを aws_lambda.Function
で直接指定していましたが、GlueはS3への配置を自分で行わなければなりません。
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';
export function deployGlueJobScript(scope: Construct, id: string) {
const destinationBucket = s3.Bucket.fromBucketName(scope, `${id}-bucket`, 'cm-da-uehara');
// GlueスクリプトをS3バケットにアップロード
new cdk.aws_s3_deployment.BucketDeployment(scope, id, {
sources: [cdk.aws_s3_deployment.Source.asset("resources/glue/job_scripts")],
destinationBucket: destinationBucket,
destinationKeyPrefix: 'glue-scripts/',
});
}
export function deployGlueCommonScript(scope: Construct, id: string) {
const destinationBucket = s3.Bucket.fromBucketName(scope, `${id}-bucket`, 'cm-da-uehara');
// commonディレクトリをwhlファイルに変換してS3にアップロード
new cdk.aws_s3_deployment.BucketDeployment(scope, id, {
sources: [
cdk.aws_s3_deployment.Source.asset(
'resources/glue',
{
bundling: {
image: cdk.DockerImage.fromRegistry('python:3.9'),
command: [
'bash',
'-c',
'pip install --user --upgrade pip && ' +
'pip install --user --no-cache-dir build wheel && ' +
'python -m build --wheel && ' +
'cp dist/*.whl /asset-output/common-0.1-py3-none-any.whl && ' +
'rm -rf dist build *.egg-info',
],
user: 'root',
},
}
),
],
destinationBucket: destinationBucket,
destinationKeyPrefix: 'glue-scripts-common/',
});
}
上記の通り、 Glueのスクリプト自体は .py
ファイルを直接S3にアップロードしてますが、共通モジュール群を格納する common
ディレクトリについてはwheel化を行っています。
Glue実行時にはこのwheel化したモジュールを追加ファイルとして指定することで、実行スクリプトから呼び出しを行うことができます。
lib/glue.ts
glue.ts
はGlue Jobそのものを記載するファイルになります。
import * as iam from 'aws-cdk-lib/aws-iam';
import * as cdk from 'aws-cdk-lib';
import * as glue from 'aws-cdk-lib/aws-glue';
import { Construct } from 'constructs';
type dict = { [key: string]: any; }
export function createGlueJob(scope: Construct, id: string, role: iam.Role, environment: dict = {}): glue.CfnJob {
const glueSourceName = id.replace(/-/g, '_')
// Glue Jobの定義
const glueJob = new glue.CfnJob(scope, id, {
name: `${id}`,
role: role.roleArn,
command: {
name: 'pythonshell',
pythonVersion: '3.9',
scriptLocation: `s3://cm-da-uehara/glue-scripts/${glueSourceName}.py`,
},
executionProperty: {
maxConcurrentRuns: 3,
},
defaultArguments: {
'--job-language': 'python',
'--extra-py-files': `s3://cm-da-uehara/glue-scripts-common/common-0.1-py3-none-any.whl`,
'--additional-python-modules': 'pytz==2024.1',
'--environment': JSON.stringify(environment),
},
});
return glueJob;
}
ポイントは defaultArguments
の記載になります。
--extra-py-files
として、先にwheel化した common
モジュールを指定しています。
また、LambdaではLayerとして作成していた pytz==2024.1
を --additional-python-modules
として指定しています。
これにより、Glue起動時に pip
でインストールされる形になります。
インターネットに接続できない環境にある、という場合は、やはり --additional-python-modules
で pip
を利用する形でなく、wheel化をする必要が出てきますのでその点はご留意下さい。
また、Lambdaで環境変数として受け取っていた値についてもキー・バリューの値を JSON.stringify()
をした上で --environment
に渡しています。
lib/stepfunctions_with_glue.ts
stepfunctions_with_glue.ts
は以下の通りです。
import * as cdk from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Construct } from 'constructs';
export function createStepFunctionsWithGlue(
scope: Construct,
id: string,
stateMachineRole: iam.Role,
startGlue: cdk.aws_glue.CfnJob,
endLambda: cdk.aws_lambda.Function,
): sfn.StateMachine {
const retry: sfn.RetryProps = {
errors: ['States.ALL'],
maxAttempts: 2,
}
// Glue タスク
const startTask = new tasks.GlueStartJobRun(scope, `${id}_start_task`, {
glueJobName: startGlue.name!,
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
arguments: sfn.TaskInput.fromObject({
'--event': sfn.JsonPath.stringAt('States.JsonToString($)'),
'--task_token': sfn.JsonPath.taskToken
}),
timeout: cdk.Duration.minutes(30),
}).addRetry(retry)
// Lambda タスク
const endTask = new tasks.LambdaInvoke(scope, `${id}_end_task`, {
lambdaFunction: endLambda,
outputPath: '$.Payload',
}).addRetry(retry);
// ステートマシンの定義
const definition = startTask.next(endTask);
return new sfn.StateMachine(scope, id, {
stateMachineName: `${id}`,
definitionBody: sfn.DefinitionBody.fromChainable(definition),
timeout: cdk.Duration.hours(1),
role: stateMachineRole,
});
}
移行前ではLambdaにしていた startTask
の部分をGlueに置き換えています。
ここでもポイントは arguments
部分になります。
Lambdaでは event
として取得できていたステートマシンからの入力を、 sfn.JsonPath.stringAt('States.JsonToString($)')
という形にした上で --event
として渡しています。
また、GlueはLambdaのようにスクリプトから return
した値がステートマシンの出力とならない(つぎのステートに入力として渡すことができない)ので、 --task_token
として 実行タスクのトークンを渡し、そちらに明示的に結果を返すようにします。
resources/glue/job_scripts/start_glue.py
移行前の start_handler.py
に該当するのが start_glue.py
となります。
import json
import os
import sys
import boto3
from awsglue.utils import getResolvedOptions
from common import my_utils
SAVE_BUCKET_NAME = None
sfn_client = boto3.client("stepfunctions")
def get_task_token(args):
for i in range(len(args)):
if args[i] == "--task_token":
return args[i + 1]
raise Exception("task_tokenが見つかりませんでした")
def send_success_to_step_functions(task_token, output):
response = sfn_client.send_task_success(taskToken=task_token, output=output)
print(f"response: {response}")
def send_fail_to_step_functions(task_token, error=None, cause=None):
response = sfn_client.send_task_failure(
taskToken=task_token, error=error, cause=cause
)
print(f"response: {response}")
def main(argv):
callback_token = get_task_token(argv)
try:
# argumentの取得
args = getResolvedOptions(argv, ["event", "environment"])
event = json.loads(args["event"])
# 環境変数の設定
env_dict = json.loads(args["environment"])
for key, value in env_dict.items():
os.environ[key.upper()] = str(value)
global SAVE_BUCKET_NAME
SAVE_BUCKET_NAME = os.getenv("SAVE_BUCKET_NAME", "")
print(f"event: {json.dumps(event)}")
name = event.get("name", "")
hello_str = my_utils.get_hello_str(name)
print(f"{hello_str}")
s3_uri = f"s3://{SAVE_BUCKET_NAME}/test"
output = {
"result": "success",
"s3_uri": s3_uri,
}
send_success_to_step_functions(callback_token, json.dumps(output))
except Exception as e:
error = type(e).__name__
cause = str(e)
send_fail_to_step_functions(callback_token, error=error, cause=cause)
if __name__ == "__main__":
main(sys.argv)
引数として与えられた event
を json_loads()
を利用してロードすることにより、Lambdaで event
を取得したときと同じ状態にしています。
また、 event
として渡された環境変数も os
モジュールを使ってsetすることにより、Lambdaで環境変数を与えられた状態と同じ状態にしています。
これらは、既存のLambdaスクリプトの改修を最小限に抑える前処理となります。
Step Functionsへの出力は、 return
ではなく boto3
のStep Functions クライアントにある send_task_success()
や send_task_failure()
を呼び出すことで実現しています。
AWSの公式ドキュメントにも記載されていますが、上記のように .sync
ジョブであっても、途中でタスクトークンに対し SendTaskSuccess
や SendTaskFailure
APIがコールされると、提供されたデータを使用してタスクの完了およびジョブの監視を停止し、ワークフローを続行します。
※ただ、Step FunctionsのAPIのコールそのものが失敗した場合は通常通りGlueのJob Runのエラーとなるため、その点はご留意下さい。
これにより、 Lambda で return
で返していた出力を擬似的に再現しています。
Step FunctionsのAWSサービス毎の統合についてはこちらの公式ドキュメントをご確認下さい。
lib/cdk-my-sample-app-stack.ts
cdk-my-sample-app-stack.ts
は以下です。
先のLambdaの構成に加え、Glueの記載を追記しています。
import * as lambdaPython from '@aws-cdk/aws-lambda-python-alpha';
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import { createLambdaRole, createStepFunctionsRole, createGlueRole } from './iam_role';
import { createLambdaFunction } from './lambda';
import { createStepFunctions } from './stepfunctions_with_lambda';
import { createStepFunctionsWithGlue } from './stepfunctions_with_glue';
import { deployGlueJobScript, deployGlueCommonScript } from './glue_scripts';
import { createGlueJob } from './glue';
export class CdkMySampleAppStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// LambdaLayerの作成
const baseLayer = new lambdaPython.PythonLayerVersion(this, 'BaseLayer', {
entry: 'resources/lambda_layer',
compatibleRuntimes: [lambda.Runtime.PYTHON_3_9],
description: 'The layer containing Python dependencies',
});
const layers = [baseLayer]
// Lambda IAM Role
const lambdaRole = createLambdaRole(this, 'my-lambda-role');
// 環境変数の定義
const env_vars = {
SAVE_BUCKET_NAME: 'uehara-test-bucket',
}
// Lambda Function
const startLambda = createLambdaFunction(this, 'start-handler', lambdaRole, layers, env_vars);
const endLambda = createLambdaFunction(this, 'end-handler', lambdaRole, layers, {})
// Step Functions IAM Role
const stepFunctionsRole = createStepFunctionsRole(this, 'my-sfn-role');
// Step Functions with Lambda
const stepFunctions = createStepFunctions(this, 'my-sfn-with-lambda', stepFunctionsRole, startLambda, endLambda);
// Glueのスクリプトをデプロイ
deployGlueCommonScript(this, 'deploy-glue-common-script')
deployGlueJobScript(this, 'deploy-glue-job-script');
// Glue IAM Role
const glueRole = createGlueRole(this, 'my-glue-role');
// Glue
const startGlue = createGlueJob(this, 'start-glue', glueRole, env_vars);
// StepFunctions with Glue
const stepFunctions_with_glue
= createStepFunctionsWithGlue(this, 'my-sfn-with-glue', stepFunctionsRole, startGlue, endLambda);
}
}
デプロイしてみる
デプロイについては先と同様になります。
# デプロイ
$ cdk deploy CdkMySampleAppStack --profile <AWSプロファイル名>
Step Functionsの実行
デプロイが完了すると、1つ目のステートがLamdaからGlueになっているStep Functionsが作成されます。
作成されたStep Functionsに次のようなパラメータを渡し実行してみます。
{
"name": "uehara"
}
最初のステートの出力は以下の通りです。
Lambda関数の構成と同様、Glueについても返り値として設定した result
と s3_uri
がステートの出力パラメータになっていることが分かります。
{
"result": "success",
"s3_uri": "s3://uehara-test-bucket/test"
}
ログにも、きちんと期待された値が出力されていることが確認できました。
最後に
今回は、Step Functions内のLambdaをGlue (Python Shell)にAWS CDKを利用して置き換えてみました。
参考になりましたら幸いです。